Flow 是 cold stream,只有在呼叫 Terminal operator 的時候才會執行。也就是說每一個 Flow 都只有一次性的工作,只要呼叫一次 Terminal operator 就會完成這一次的呼叫。例如我們的 Terminal operator 選擇 collector ,那麼當某個 Flow 執行 collector 之後,就會把 Flow 裏面所有的資料根據我們所設定的動作來執行,最後經過 collector 把結果儲存下來,然後就結束這一次的任務。
Flow 也有可以支援 hot stream 的方式,它的名稱為 SharedFlow,它可以支援多個 Collector 共享 Flow 的發射的內容 (emitted value),所以對於所有的 Collector,只有一次的執行。
那麼,SharedFlow 是如何共享所有發射的內容呢?它是使用廣播(broadcast)的方式。因為 SharedFlow 永遠都不會結束等著廣播發送內容給所有的 Collector,所以稱為 hot stream。
SharedFlow 的介面 interface,如下:
interface SharedFlow<out T> : Flow<T> {
val replayCache: List<T>
}
由上我們得知 SharedFlow 是繼承 Flow 的,其中裏面包含一個函式 replayCache:List<T>
,當有一個新的 collector 加入時,就會根據設定的 replay 數量來把最後的項目廣播給新的 collector 上。
我們可以使用 shareIn
讓原本的 Flow 轉變成 SharedFlow
class Day21 {
val scope = CoroutineScope(Job())
fun sharedFlow(): Flow<Int> = flow {
println("Flow started")
repeat(10) {
delay(100)
emit(it)
}
}.shareIn(
scope,
replay = 10,
started = SharingStarted.WhileSubscribed()
)
}
在上方中,我們在原本的 flow 底下使用 shareIn
來讓這個 flow 轉變成 sharedFlow,其中需要帶三個參數,第一個是帶入一個 CoroutineScope,也就是這個 Flow 所在的 Scope,第二個則是當有新的 collector 加入時,需要重播幾項,最後一個參數 started
則是什麼時候開始啟動。
執行:
@OptIn(InternalCoroutinesApi::class)
fun main() = runBlocking {
val day21 = Day21()
val sharedFlow = day21.sharedFlow()
launch {
sharedFlow.collect {
println("(1): $it")
}
}
delay(500)
launch {
sharedFlow.collect {
println("(2): $it")
}
}
println("done")
}
外層的 coroutine 會先執行 500 毫秒,第一個 launch 執行 500 毫秒時外層 coroutine 會結束 delay,接著第二個 launch 也會跟著執行,但是因為第一個 launch 已經在 500 毫秒內接收了一堆內容,所以這時候 sharedFlow 就要把那些內容發給第二個 launch,等到發完之後,接下來的每一筆資料都會同時傳給兩個 launch。
結果如下:
Flow started
(1): 0
(1): 1
(1): 2
(1): 3
done
(2): 0
(2): 1
(2): 2
(2): 3
(1): 4
(2): 4
(1): 5
(2): 5
(1): 6
(2): 6
(1): 7
(2): 7
(1): 8
(2): 8
(1): 9
(2): 9
雖然我們的 flow 只有發送 10 個值,但是 sharedFlow 不會因為我們發完之後就停了,它會一直處於執行的狀態,除非所有的 collector 都消失。(在這邊的消失可能是取消或是發生 exception)
另外,這邊的 collector 因為都是在執行之後就開始接收內容,所以 collector 在 sharedFlow 就稱為 subscriber (訂閱者)。
--- 接下篇 ---
有興趣的讀者歡迎參考:https://coroutine.kotlin.tips/
天瓏書局